{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 简介\n",
    "MLlib设计理念是将数据以RDD的形式表示，然后在分布式数据集上调用各种算法。其实，MLlib就是RDD上一系列可供调用的函数的集合"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##数据类型\n",
    "\n",
    "    1. org.apache.spark.mllib（java/scala）\n",
    "    2. pysqark.mllib(python版本)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Vector向量\n",
    "\n",
    "本地向量local vector，索引从0开始，并且是整数，值是double类型，储存在单个机器中，稠密向量表示向量的每一位都储存，后者只储存非零位以节约空间\n",
    "，通过mllib.linalg.Vectors类创建"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.linalg import Vectors\n",
    "den=Vectors.dense([1.0,2.0,3.0])\n",
    "print(den)\n",
    "\n",
    "'''\n",
    "SparkVector创建方式有两种：\n",
    "1.Vector.sparse(向量长度，索引数组，与索引数组所对应的数值数组)\n",
    "2.Vector.sparse(向量长度，(索引，数值)，(索引，数值)。。。。)\n",
    "\n",
    "'''\n",
    "\n",
    "spa=Vectors.sparse(4,[0,2],[1.0,2.0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### LabeledPoint\n",
    "在监督学习算法中使用supervised learing，LabeledPoint表示带标签的数据点，包括一个特征向量与一个标签（有一个浮点数表示），位于mllib.regression包里"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.mllib.regression import LabeledPoint\n",
    "from pyspark.mllib.linelg import Vectors\n",
    "pos=LabeledPoint(1.0,Vectors.dense([1.0,2.0,3.0]))\n",
    "neg=LabeledPoint(0.0,Vectors.dense([1.0,2.0,3.0]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Matrix\n",
    "\n",
    "1. 稠密矩阵：实体值以列为主要次序的形式，存放在单个Double型数组中\n",
    "2. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Rating\n",
    "用于产品推荐，表示用户对一个产品的评分，位于mllib.recommendation包中"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 各类Model类（模型）\n",
    "\n",
    "每个Model都是训练算法的结果，每个模型都有一个predict（）方法"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 统计\n",
    "不论是在即时的探索中，还是在机器学习的数据理解中，基本的统计都是数据分析的重要部分。MLlib 通过mllib.stat.Statistics 类中的方法提供了几种广泛使用的统计函数，这些函数可以直接在RDD 上使用。\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 算法\n",
    "\n",
    "\n",
    "    1.特征提取\n",
    "    2.降维\n",
    "    3.分类与回归\n",
    "    4.聚类\n",
    "    5.协同过滤和推荐"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#加载模块\n",
    "from pyspark.mllib.util import MLUtils\n",
    "from pyspark.mllib.Classification import SVMWithSGD\n",
    "\n",
    "#读取数据\n",
    "dataFile='filepath'\n",
    "data=MLUtils.loadlibSVMFile(sc,dataFile)\n",
    "\n",
    "\n",
    "#划分数据集\n",
    "splits=data.randomSplit([0.8,0.2],seed=9L)\n",
    "training=splits[0].cache()\n",
    "test=splits[1]\n",
    "\n",
    "#打印分割后的数据集\n",
    "print('TrainingCount:%s' % training.count())\n",
    "print('TestingCount:%s' % test.count())\n",
    "\n",
    "#构建模型\n",
    "model=SVMWithSGD.train(training,100)\n",
    "scoreAndLables=test.map(lambda point:(model.predict(point.features),point.label))\n",
    "#输出结果，包含预测的数字结果和0/1结果：\n",
    "for score,label in scoreAndLables:\n",
    "    print (score ,label)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#每次使用Spark都需要先构建SparkSession，因此我们导入pyspark.sql库并初始化一个SparkSession\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark=SparkSession.builder.getOrCreate()\n",
    "#加载数据\n",
    "data =spark.read.csv('./boston_housing.csv',header=True,inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#创建特征数组，只需要引入VectorAssembler类并传入特征向量的列名称即可\n",
    "feature_columns=data.columns[:-1]   \n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "assemler=VectorAssembler(inputCols=feature_columns,outputCol='features')\n",
    "data_2=assemler.transform(data)\n",
    "\n",
    "#训练集和测试集的划分，这里使用RandomSplit函数\n",
    "train,test=data_2.randomSplit([0.7,0.3])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "训练与评估模型，与平时我们训练和评估模型一样，只不过在spark中我们使用的是spark为我们提供的算法函数，\n",
    "在spark中我们需要从pyspark.ml中导入算法函数，使用model.transform()函数进行预测，这个和之前用的model.predict()还是有区别的"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.regression import LinearRegression\n",
    "algo=LinerRegression(featureCol='features',labelCol='medv')\n",
    "model=algo.fit(train)\n",
    "evalution_summary=model.evaluate(test)\n",
    "predictions=model.transfrom(test)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 完整代码"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 首先导入findspark库并通过传递Apache Spark文件夹的路径进行初始化。\n",
    "import findspark\n",
    "findspark.init('/opt/spark')\n",
    "\n",
    "#create a sparksession\n",
    "from pyspark.sql import SparkSession\n",
    "spark=SparkSession.builder.getOrCreate()\n",
    "#load data\n",
    "data=spark.read.csv('./boston_housing.csv',header=True,inferSchema=True)\n",
    "\n",
    "#creat features vectors\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "feature_columns = data.columns[:-1]\n",
    "assembler = VectorAssembler(inputCols=feature_columns,outputCol=\"features\")\n",
    "data_2 = assembler.transform(data)\n",
    "\n",
    "#train/test split\n",
    "train, test = data_2.randomSplit([0.7, 0.3])\n",
    "\n",
    "# define the model\n",
    "from pyspark.ml.regression import LinearRegression\n",
    "algo = LinearRegression(featuresCol=\"features\", labelCol=\"medv\")\n",
    "\n",
    "# train the model\n",
    "model = algo.fit(train)\n",
    "# evaluation\n",
    "evaluation_summary = model.evaluate(test)\n",
    "evaluation_summary.meanAbsoluteError\n",
    "evaluation_summary.rootMeanSquaredError\n",
    "evaluation_summary.r2\n",
    "# predicting values\n",
    "predictions = model.transform(test)\n",
    "predictions.select(predictions.columns[13:]).show() # here I am filtering out some columns just for the figure to fit"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
